package com.hngy.java;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

public class WordCountJava {

    //单词统计
    public static void main(String[] args) {
        //第一步：创建SparkContext：
        //注意，针对java代码需要获取JavaSparkContext
        SparkConf conf = new SparkConf();
        conf.setAppName("WordCountJava")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        //加载数据
        String path = "D:\\b.txt";
        if(args.length==1){
            path = args[0];
        }
        JavaRDD<String> linesRDD = sc.textFile(path);

        //对数据进行切割，把一行数据切分成一个一个的单词
        //注意：FlatMapFunction的泛型，第一个参数表示输入数据类型，第二个表示
        JavaRDD<String> wordRDD = linesRDD.flatMap(new FlatMapFunction<String, String>() {
            public Iterator<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" ")).iterator();
            }
        });

        //测试打印
        /*wordRDD.foreach(new VoidFunction<String>() {
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });*/

        //迭代words,将每个word转化为(word,1)这种形式
        //注意：PairFunction的泛型，第一个参数是输入数据类型
        //第二个是输出tuple中的第一个参数类型，第三个是输出tuple中的第二个参数类型
        //注意：如果后面需要使用到....ByKey，前面都需要使用mapToPair去处理
        JavaPairRDD<String,Integer> pairRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word,1);
            }
        });

        //根据key(其实就是word)进行分组聚合统计
        JavaPairRDD<String,Integer> wordCountRDD = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer i1, Integer i2) throws Exception {
                return i1 + i2;
            }
        });

        //将结果打印到控制台
        wordCountRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            public void call(Tuple2<String, Integer> tup) throws Exception {
                System.out.println(tup._1 + "--" + tup._2);
            }
        });

        //停止SparkContext
        sc.stop();
    }
}
